import logging
from typing import Dict, List, Optional, Union

from pymilvus.client.abstract import AnnSearchRequest, BaseRanker
from pymilvus.client.constants import DEFAULT_CONSISTENCY_LEVEL
from pymilvus.client.embedding_list import EmbeddingList
from pymilvus.client.search_iterator import SearchIteratorV2
from pymilvus.client.types import (
    CompactionPlans,
    ExceptionsMessage,
    LoadedSegmentInfo,
    LoadState,
    OmitZeroDict,
    ReplicaInfo,
    ResourceGroupConfig,
    SegmentInfo,
)
from pymilvus.client.utils import convert_struct_fields_to_user_format, get_params, is_vector_type
from pymilvus.exceptions import (
    DataTypeNotMatchException,
    ErrorCode,
    MilvusException,
    ParamError,
    PrimaryKeyException,
    ServerVersionIncompatibleException,
)
from pymilvus.orm.collection import CollectionSchema, Function, FunctionScore, Highlighter
from pymilvus.orm.connections import connections
from pymilvus.orm.constants import FIELDS, METRIC_TYPE, TYPE, UNLIMITED
from pymilvus.orm.iterator import QueryIterator, SearchIterator
from pymilvus.orm.types import DataType

from ._utils import create_connection
from .base import BaseMilvusClient
from .check import validate_param
from .index import IndexParam, IndexParams

logger = logging.getLogger(__name__)


class MilvusClient(BaseMilvusClient):
    """The Milvus Client"""

    # pylint: disable=logging-too-many-args, too-many-instance-attributes

    def __init__(
        self,
        uri: str = "http://localhost:19530",
        user: str = "",
        password: str = "",
        db_name: str = "",
        token: str = "",
        timeout: Optional[float] = None,
        **kwargs,
    ) -> None:
        """A client for the common Milvus use case.

        This client attempts to hide away the complexity of using Pymilvus. In a lot ofcases what
        the user wants is a simple wrapper that supports adding data, deleting data, and searching.

        Args:
            uri (str, optional): The connection address to use to connect to the
                instance. Defaults to "http://localhost:19530". Another example:
                "https://username:password@in01-12a.aws-us-west-2.vectordb.zillizcloud.com:19538
            timeout (float, optional): What timeout to use for function calls. Defaults
                to None.
                Unit: second
        """
        self._using = create_connection(
            uri, token, db_name, user=user, password=password, timeout=timeout, **kwargs
        )
        self.is_self_hosted = bool(self.get_server_type() == "milvus")

    def create_collection(
        self,
        collection_name: str,
        dimension: Optional[int] = None,
        primary_field_name: str = "id",  # default is "id"
        id_type: str = "int",  # or "string",
        vector_field_name: str = "vector",  # default is  "vector"
        metric_type: str = "COSINE",
        auto_id: bool = False,
        timeout: Optional[float] = None,
        schema: Optional[CollectionSchema] = None,
        index_params: Optional[IndexParams] = None,
        **kwargs,
    ):
        if schema is None:
            return self._fast_create_collection(
                collection_name,
                dimension,
                primary_field_name=primary_field_name,
                id_type=id_type,
                vector_field_name=vector_field_name,
                metric_type=metric_type,
                auto_id=auto_id,
                timeout=timeout,
                **kwargs,
            )

        return self._create_collection_with_schema(
            collection_name, schema, index_params, timeout=timeout, **kwargs
        )

    def _fast_create_collection(
        self,
        collection_name: str,
        dimension: int,
        primary_field_name: str = "id",  # default is "id"
        id_type: Union[DataType, str] = DataType.INT64,  # or "string",
        vector_field_name: str = "vector",  # default is  "vector"
        metric_type: str = "COSINE",
        auto_id: bool = False,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        validate_param("dimension", dimension, int)

        if "enable_dynamic_field" not in kwargs:
            kwargs["enable_dynamic_field"] = True

        schema = self.create_schema(auto_id=auto_id, **kwargs)

        if id_type in ("int", DataType.INT64):
            pk_data_type = DataType.INT64
        elif id_type in ("string", "str", DataType.VARCHAR):
            pk_data_type = DataType.VARCHAR
        else:
            raise PrimaryKeyException(message=ExceptionsMessage.PrimaryFieldType)

        pk_args = {}
        if "max_length" in kwargs and pk_data_type == DataType.VARCHAR:
            pk_args["max_length"] = kwargs["max_length"]

        schema.add_field(primary_field_name, pk_data_type, is_primary=True, **pk_args)
        schema.add_field(vector_field_name, DataType.FLOAT_VECTOR, dim=dimension)
        schema.verify()

        conn = self._get_connection()
        if "consistency_level" not in kwargs:
            kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL
        conn.create_collection(collection_name, schema, timeout=timeout, **kwargs)

        index_params = IndexParams()
        index_params.add_index(vector_field_name, index_type="AUTOINDEX", metric_type=metric_type)
        self.create_index(collection_name, index_params, timeout=timeout)
        self.load_collection(collection_name, timeout=timeout)

    def create_index(
        self,
        collection_name: str,
        index_params: IndexParams,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        validate_param("collection_name", collection_name, str)
        validate_param("index_params", index_params, IndexParams)
        if len(index_params) == 0:
            raise ParamError(message="IndexParams is empty, no index can be created")

        for index_param in index_params:
            self._create_index(collection_name, index_param, timeout=timeout, **kwargs)

    def _create_index(
        self,
        collection_name: str,
        index_param: IndexParam,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.create_index(
            collection_name,
            index_param.field_name,
            index_param.get_index_configs(),
            timeout=timeout,
            index_name=index_param.index_name,
            **kwargs,
        )

    def insert(
        self,
        collection_name: str,
        data: Union[Dict, List[Dict]],
        timeout: Optional[float] = None,
        partition_name: Optional[str] = "",
        **kwargs,
    ) -> Dict:
        """Insert data into the collection.

        If the Milvus Client was initiated without an existing Collection, the first dict passed
        in will be used to initiate the collection.

        Args:
            data (List[Dict[str, any]]): A list of dicts to pass in. If list not provided, will
                cast to list.
            timeout (float, optional): The timeout to use, will override init timeout. Defaults
                to None.

        Raises:
            DataNotMatchException: If the data has missing fields an exception will be thrown.
            MilvusException: General Milvus error on insert.

        Returns:
            Dict: Number of rows that were inserted and the inserted primary key list.
        """
        # If no data provided, we cannot input anything
        if isinstance(data, Dict):
            data = [data]

        msg = "wrong type of argument 'data',"
        msg += f"expected 'Dict' or list of 'Dict', got '{type(data).__name__}'"

        if not isinstance(data, List):
            raise TypeError(msg)

        if len(data) == 0:
            return {"insert_count": 0, "ids": []}

        conn = self._get_connection()
        # Insert into the collection.
        try:
            res = conn.insert_rows(
                collection_name, data, partition_name=partition_name, timeout=timeout, **kwargs
            )
        except Exception as ex:
            raise ex from ex
        return OmitZeroDict(
            {
                "insert_count": res.insert_count,
                "ids": res.primary_keys,
                "cost": res.cost,
            }
        )

    def upsert(
        self,
        collection_name: str,
        data: Union[Dict, List[Dict]],
        timeout: Optional[float] = None,
        partition_name: Optional[str] = "",
        **kwargs,
    ) -> Dict:
        """Upsert data into the collection.

        Args:
            collection_name (str): Name of the collection to upsert into.
            data (List[Dict[str, any]]): A list of dicts to pass in. If list not provided, will
                cast to list.
            timeout (float, optional): The timeout to use, will override init timeout. Defaults
                to None.
            partition_name (str, optional): Name of the partition to upsert into.
            **kwargs (dict): Extra keyword arguments.

                * *partial_update* (bool, optional): Whether this is a partial update operation.
                    If True, only the specified fields will be updated while others remain unchanged
                    Default is False.

        Raises:
            DataNotMatchException: If the data has missing fields an exception will be thrown.
            MilvusException: General Milvus error on upsert.

        Returns:
            Dict: Number of rows that were upserted.
        """
        # If no data provided, we cannot input anything
        if isinstance(data, Dict):
            data = [data]

        msg = "wrong type of argument 'data',"
        msg += f"expected 'Dict' or list of 'Dict', got '{type(data).__name__}'"

        if not isinstance(data, List):
            raise TypeError(msg)

        if len(data) == 0:
            return {"upsert_count": 0, "ids": []}

        conn = self._get_connection()
        # Upsert into the collection.
        try:
            res = conn.upsert_rows(
                collection_name, data, partition_name=partition_name, timeout=timeout, **kwargs
            )
        except Exception as ex:
            raise ex from ex

        return OmitZeroDict(
            {
                "upsert_count": res.upsert_count,
                "cost": res.cost,
                # milvus server supports upsert on autoid=ture from v2.4.15
                # upsert on autoid=ture will return new ids for user
                "ids": res.primary_keys,
            }
        )

    def hybrid_search(
        self,
        collection_name: str,
        reqs: List[AnnSearchRequest],
        ranker: Union[BaseRanker, Function],
        limit: int = 10,
        output_fields: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        partition_names: Optional[List[str]] = None,
        **kwargs,
    ) -> List[List[dict]]:
        """Conducts multi vector similarity search with a rerank for rearrangement.

        Args:
            collection_name(``string``): The name of collection.
            reqs (``List[AnnSearchRequest]``): The vector search requests.
            ranker (``Union[BaseRanker, Function]``): The ranker.
            limit (``int``): The max number of returned record, also known as `topk`.

            partition_names (``List[str]``, optional): The names of partitions to search on.
            output_fields (``List[str]``, optional):
                The name of fields to return in the search result.  Can only get scalar fields.
            round_decimal (``int``, optional):
                The specified number of decimal places of returned distance.
                Defaults to -1 means no round to returned distance.
            timeout (``float``, optional): A duration of time in seconds to allow for the RPC.
                If timeout is set to None, the client keeps waiting until the server
                responds or an error occurs.
            **kwargs (``dict``): Optional search params

                * *offset* (``int``, optinal)
                    offset for pagination.

                * *consistency_level* (``str/int``, optional)
                    Which consistency level to use when searching in the collection.

                    Options of consistency level: Strong, Bounded, Eventually, Session, Customized.

                    Note: this parameter overwrites the same one specified when creating collection,
                    if no consistency level was specified, search will use the
                    consistency level when you create the collection.

        Returns:
            List[List[dict]]: A nested list of dicts containing the result data.

        Raises:
            MilvusException: If anything goes wrong
        """

        conn = self._get_connection()
        return conn.hybrid_search(
            collection_name,
            reqs,
            ranker,
            limit=limit,
            partition_names=partition_names,
            output_fields=output_fields,
            timeout=timeout,
            **kwargs,
        )

    def search(
        self,
        collection_name: str,
        data: Union[List[list], list],
        filter: str = "",
        limit: int = 10,
        output_fields: Optional[List[str]] = None,
        search_params: Optional[dict] = None,
        timeout: Optional[float] = None,
        partition_names: Optional[List[str]] = None,
        anns_field: Optional[str] = None,
        ranker: Optional[Union[Function, FunctionScore]] = None,
        highlighter: Optional[Highlighter] = None,
        **kwargs,
    ) -> List[List[dict]]:
        """Search for a query vector/vectors.

        In order for the search to process, a collection needs to have been either provided
        at init or data needs to have been inserted.

        Args:
            data (Union[List[list], list, List[EmbeddingList]]): The vector/vectors/embedding
                list to search.
            limit (int, optional): How many results to return per search. Defaults to 10.
            filter(str, optional): A filter to use for the search. Defaults to None.
            output_fields (List[str], optional): List of which field values to return. If None
                specified, only primary fields including distances will be returned.
            search_params (dict, optional): The search params to use for the search.
            ranker (Function, optional): The ranker to use for the search.
            timeout (float, optional): Timeout to use, overides the client level assigned at init.
                Defaults to None.

        Raises:
            ValueError: The collection being searched doesnt exist. Need to insert data first.

        Returns:
            List[List[dict]]: A nested list of dicts containing the result data. Embeddings are
                not included in the result data.
        """
        # Convert EmbeddingList objects to flat arrays if present
        if isinstance(data, list) and data and isinstance(data[0], EmbeddingList):
            data = [emb_list.to_flat_array() for emb_list in data]
            kwargs["is_embedding_list"] = True

        conn = self._get_connection()
        return conn.search(
            collection_name,
            data,
            anns_field or "",
            search_params or {},
            expression=filter,
            limit=limit,
            output_fields=output_fields,
            partition_names=partition_names,
            expr_params=kwargs.pop("filter_params", {}),
            timeout=timeout,
            ranker=ranker,
            highlighter=highlighter,
            **kwargs,
        )

    def query(
        self,
        collection_name: str,
        filter: str = "",
        output_fields: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        ids: Optional[Union[List, str, int]] = None,
        partition_names: Optional[List[str]] = None,
        **kwargs,
    ) -> List[dict]:
        """Query for entries in the Collection.

        Args:
            filter (str): The filter to use for the query.
            output_fields (List[str], optional): List of which field values to return. If None
                specified, all fields excluding vector field will be returned.
            partitions (List[str], optional): Which partitions to perform query. Defaults to None.
            timeout (float, optional): Timeout to use, overides the client level assigned at init.
                Defaults to None.

        Raises:
            ValueError: Missing collection.

        Returns:
            List[dict]: A list of result dicts, vectors are not included.
        """
        if filter and not isinstance(filter, str):
            raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))

        if filter and ids is not None:
            raise ParamError(message=ExceptionsMessage.AmbiguousQueryFilterParam)

        if isinstance(ids, (int, str)):
            ids = [ids]

        conn = self._get_connection()

        if ids:
            schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout)
            filter = self._pack_pks_expr(schema_dict, ids)

        if not output_fields:
            output_fields = ["*"]

        return conn.query(
            collection_name,
            expr=filter,
            output_fields=output_fields,
            partition_names=partition_names,
            timeout=timeout,
            expr_params=kwargs.pop("filter_params", {}),
            **kwargs,
        )

    def query_iterator(
        self,
        collection_name: str,
        batch_size: Optional[int] = 1000,
        limit: Optional[int] = UNLIMITED,
        filter: Optional[str] = "",
        output_fields: Optional[List[str]] = None,
        partition_names: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        if filter is not None and not isinstance(filter, str):
            raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))

        conn = self._get_connection()
        # set up schema for iterator
        schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)

        return QueryIterator(
            connection=conn,
            collection_name=collection_name,
            batch_size=batch_size,
            limit=limit,
            expr=filter,
            output_fields=output_fields,
            partition_names=partition_names,
            schema=schema_dict,
            timeout=timeout,
            **kwargs,
        )

    def search_iterator(
        self,
        collection_name: str,
        data: Union[List[list], list],
        batch_size: Optional[int] = 1000,
        filter: Optional[str] = None,
        limit: Optional[int] = UNLIMITED,
        output_fields: Optional[List[str]] = None,
        search_params: Optional[dict] = None,
        timeout: Optional[float] = None,
        partition_names: Optional[List[str]] = None,
        anns_field: Optional[str] = None,
        round_decimal: int = -1,
        **kwargs,
    ) -> Union[SearchIteratorV2, SearchIterator]:
        """Creates an iterator for searching vectors in batches.

        This method returns an iterator that performs vector similarity search in batches,
        which is useful when dealing with large result sets. It automatically attempts to use
        Search Iterator V2 if supported by the server, otherwise falls back to V1.

        Args:
            collection_name (str): Name of the collection to search in.
            data (Union[List[list], list]): Vector data to search with. For V2, only single vector
                search is supported.
            batch_size (int, optional): Number of results to fetch per batch. Defaults to 1000.
                Must be between 1 and MAX_BATCH_SIZE.
            filter (str, optional): Filtering expression to filter the results. Defaults to None.
            limit (int, optional): Total number of results to return. Defaults to UNLIMITED.
                (Deprecated) This parameter is deprecated and will be removed in a future release.
            output_fields (List[str], optional): Fields to return in the results.
            search_params (dict, optional): Parameters for the search operation.
            timeout (float, optional): Timeout in seconds for each RPC call.
            partition_names (List[str], optional): Names of partitions to search in.
            anns_field (str, optional): Name of the vector field to search. Can be empty when
                there is only one vector field in the collection.
            round_decimal (int, optional): Number of decimal places for distance values.
                Defaults to -1 (no rounding).
            **kwargs: Additional arguments to pass to the search operation.

        Returns:
            SearchIterator: An iterator object that yields search results in batches.

        Raises:
            MilvusException: If the search operation fails.
            ParamError: If the input parameters are invalid (e.g., invalid batch_size or multiple
                vectors in data when using V2).

        Examples:
            >>> # Search with iterator
            >>> iterator = client.search_iterator(
            ...     collection_name="my_collection",
            ...     data=[[0.1, 0.2]],
            ...     batch_size=100
            ... )
        """

        conn = self._get_connection()

        # compatibility logic, change this when support get version from server
        try:
            return SearchIteratorV2(
                connection=conn,
                collection_name=collection_name,
                data=data,
                batch_size=batch_size,
                limit=limit,
                filter=filter,
                output_fields=output_fields,
                search_params=search_params or {},
                timeout=timeout,
                partition_names=partition_names,
                anns_field=anns_field or "",
                round_decimal=round_decimal,
                **kwargs,
            )
        except ServerVersionIncompatibleException:
            # for compatibility, return search_iterator V1
            logger.warning(ExceptionsMessage.SearchIteratorV2FallbackWarning)

        # following is the old code for search_iterator V1
        if filter is not None and not isinstance(filter, str):
            raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))

        # set up schema for iterator
        schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
        # if anns_field is not provided
        # if only one vector field, use to search
        # if multiple vector fields, raise exception and abort
        if anns_field is None or anns_field == "":
            vec_field = None
            fields = schema_dict[FIELDS]
            vec_field_count = 0
            for field in fields:
                if is_vector_type(field[TYPE]):
                    vec_field_count += 1
                    vec_field = field
            if vec_field is None:
                raise MilvusException(
                    code=ErrorCode.UNEXPECTED_ERROR,
                    message="there should be at least one vector field in milvus collection",
                )
            if vec_field_count > 1:
                raise MilvusException(
                    code=ErrorCode.UNEXPECTED_ERROR,
                    message="must specify anns_field when there are more than one vector field",
                )
            anns_field = vec_field["name"]
            if anns_field is None or anns_field == "":
                raise MilvusException(
                    code=ErrorCode.UNEXPECTED_ERROR,
                    message=f"cannot get anns_field name for search iterator, got:{anns_field}",
                )
        # set up metrics type for search_iterator which is mandatory
        if search_params is None:
            search_params = {}
        if METRIC_TYPE not in search_params:
            indexes = conn.list_indexes(collection_name)
            for index in indexes:
                if anns_field == index.index_name:
                    params = index.params
                    for param in params:
                        if param.key == METRIC_TYPE:
                            search_params[METRIC_TYPE] = param.value
        if METRIC_TYPE not in search_params:
            raise MilvusException(
                ParamError, f"Cannot set up metrics type for anns_field:{anns_field}"
            )

        search_params["params"] = get_params(search_params)

        return SearchIterator(
            connection=self._get_connection(),
            collection_name=collection_name,
            data=data,
            ann_field=anns_field,
            param=search_params,
            batch_size=batch_size,
            limit=limit,
            expr=filter,
            partition_names=partition_names,
            output_fields=output_fields,
            timeout=timeout,
            round_decimal=round_decimal,
            schema=schema_dict,
            **kwargs,
        )

    def get(
        self,
        collection_name: str,
        ids: Union[list, str, int],
        output_fields: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        partition_names: Optional[List[str]] = None,
        **kwargs,
    ) -> List[dict]:
        """Grab the inserted vectors using the primary key from the Collection.

        Due to current implementations, grabbing a large amount of vectors is slow.

        Args:
            ids (str): The pk's to get vectors for. Depending on pk_field type it can be int or str
            or a list of either.
            timeout (float, optional): Timeout to use, overides the client level assigned at
                init. Defaults to None.

        Raises:
            ValueError: Missing collection.

        Returns:
            List[dict]: A list of result dicts with keys {pk_field, vector_field}
        """
        if not isinstance(ids, list):
            ids = [ids]

        if len(ids) == 0:
            return []

        conn = self._get_connection()
        schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout)

        if not output_fields:
            output_fields = ["*"]

        expr = self._pack_pks_expr(schema_dict, ids)
        return conn.query(
            collection_name,
            expr=expr,
            output_fields=output_fields,
            partition_names=partition_names,
            timeout=timeout,
            **kwargs,
        )

    def delete(
        self,
        collection_name: str,
        ids: Optional[Union[list, str, int]] = None,
        timeout: Optional[float] = None,
        filter: Optional[str] = None,
        partition_name: Optional[str] = None,
        **kwargs,
    ) -> Dict[str, int]:
        """Delete entries in the collection by their pk or by filter.

        Starting from version 2.3.2, Milvus no longer includes the primary keys in the result
        when processing the delete operation on expressions.
        This change is due to the large amount of data involved.
        The delete interface no longer returns any results.
        If no exceptions are thrown, it indicates a successful deletion.
        However, for backward compatibility, If the primary_keys returned from old
        Milvus(previous 2.3.2) is not empty, the list of primary keys is still returned.

        Args:
            ids (list, str, int, optional): The pk's to delete.
                Depending on pk_field type it can be int or str or a list of either.
                Default to None.
            filter(str, optional): A filter to use for the deletion. Defaults to none.
            timeout (int, optional): Timeout to use, overides the client level assigned at init.
                Defaults to None.

            Note: You need to passin either ids or filter, and they cannot be used at the same time.

        Returns:
            Dict: with key 'deleted_count' and value number of rows that were deleted.
        """
        pks = kwargs.get("pks", [])
        if isinstance(pks, (int, str)):
            pks = [pks]

        for pk in pks:
            if not isinstance(pk, (int, str)):
                msg = f"wrong type of argument pks, expect list, int or str, got '{type(pk).__name__}'"
                raise TypeError(msg)

        if ids is not None:
            if isinstance(ids, (int, str)):
                pks.append(ids)
            elif isinstance(ids, list):
                for id in ids:
                    if not isinstance(id, (int, str)):
                        msg = f"wrong type of argument ids, expect list, int or str, got '{type(id).__name__}'"
                        raise TypeError(msg)
                pks.extend(ids)
            else:
                msg = f"wrong type of argument ids, expect list, int or str, got '{type(ids).__name__}'"
                raise TypeError(msg)

        # validate ambiguous delete filter param before describe collection rpc
        if filter and len(pks) > 0:
            raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam)

        expr = ""
        conn = self._get_connection()
        if len(pks) > 0:
            schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout)
            expr = self._pack_pks_expr(schema_dict, pks)
        else:
            if not isinstance(filter, str):
                raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))
            expr = filter

        ret_pks = []
        res = conn.delete(
            collection_name=collection_name,
            expression=expr,
            partition_name=partition_name,
            expr_params=kwargs.pop("filter_params", {}),
            timeout=timeout,
            **kwargs,
        )
        if res.primary_keys:
            ret_pks.extend(res.primary_keys)

        # compatible with deletions that returns primary keys
        if ret_pks:
            return ret_pks

        return OmitZeroDict({"delete_count": res.delete_count, "cost": res.cost})

    def get_collection_stats(self, collection_name: str, timeout: Optional[float] = None) -> Dict:
        conn = self._get_connection()
        stats = conn.get_collection_stats(collection_name, timeout=timeout)
        result = {stat.key: stat.value for stat in stats}
        if "row_count" in result:
            result["row_count"] = int(result["row_count"])
        return result

    def describe_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        result = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
        # Convert internal struct_array_fields to user-friendly format
        if isinstance(result, dict) and "struct_array_fields" in result:
            converted_fields = convert_struct_fields_to_user_format(result["struct_array_fields"])
            result["fields"].extend(converted_fields)
            # Remove internal struct_array_fields from user-facing response
            result.pop("struct_array_fields")

        return result

    def has_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        return conn.has_collection(collection_name, timeout=timeout, **kwargs)

    def list_collections(self, **kwargs):
        conn = self._get_connection()
        return conn.list_collections(**kwargs)

    def drop_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        """Delete the collection stored in this object"""
        conn = self._get_connection()
        conn.drop_collection(collection_name, timeout=timeout, **kwargs)

    def rename_collection(
        self,
        old_name: str,
        new_name: str,
        target_db: Optional[str] = "",
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.rename_collections(old_name, new_name, target_db, timeout=timeout, **kwargs)

    def _create_collection_with_schema(
        self,
        collection_name: str,
        schema: CollectionSchema,
        index_params: IndexParams,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        schema.verify()

        conn = self._get_connection()
        if "consistency_level" not in kwargs:
            kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL
        conn.create_collection(collection_name, schema, timeout=timeout, **kwargs)

        if index_params:
            self.create_index(collection_name, index_params, timeout=timeout)
            self.load_collection(collection_name, timeout=timeout)

    def close(self):
        connections.remove_connection(self._using)

    def load_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        """Loads the collection."""
        conn = self._get_connection()
        conn.load_collection(collection_name, timeout=timeout, **kwargs)

    def release_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        conn.release_collection(collection_name, timeout=timeout, **kwargs)

    def get_load_state(
        self,
        collection_name: str,
        partition_name: Optional[str] = "",
        timeout: Optional[float] = None,
        **kwargs,
    ) -> Dict:
        conn = self._get_connection()
        partition_names = None
        if partition_name:
            partition_names = [partition_name]
        try:
            state = conn.get_load_state(collection_name, partition_names, timeout=timeout, **kwargs)
        except Exception as ex:
            raise ex from ex

        ret = {"state": state}
        if state == LoadState.Loading:
            progress = conn.get_loading_progress(collection_name, partition_names, timeout=timeout)
            ret["progress"] = progress

        return ret

    def refresh_load(self, collection_name: str, timeout: Optional[float] = None, **kwargs):
        kwargs.pop("_refresh", None)
        conn = self._get_connection()
        conn.load_collection(collection_name, timeout=timeout, _refresh=True, **kwargs)

    def list_indexes(self, collection_name: str, field_name: Optional[str] = "", **kwargs):
        """List all indexes of collection. If `field_name` is not specified,
            return all the indexes of this collection, otherwise this interface will return
            all indexes on this field of the collection.

        :param collection_name: The name of collection.
        :type  collection_name: str

        :param field_name: The name of field.  If no field name is specified, all indexes
                of this collection will be returned.

        :return: The name list of all indexes.
        :rtype: str list
        """
        conn = self._get_connection()
        indexes = conn.list_indexes(collection_name, **kwargs)
        index_name_list = []
        for index in indexes:
            if not index:
                continue
            if not field_name or index.field_name == field_name:
                index_name_list.append(index.index_name)
        return index_name_list

    def drop_index(
        self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.drop_index(collection_name, "", index_name, timeout=timeout, **kwargs)

    def describe_index(
        self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs
    ) -> Dict:
        conn = self._get_connection()
        return conn.describe_index(collection_name, index_name, timeout=timeout, **kwargs)

    def alter_index_properties(
        self,
        collection_name: str,
        index_name: str,
        properties: dict,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.alter_index_properties(
            collection_name, index_name, properties=properties, timeout=timeout, **kwargs
        )

    def drop_index_properties(
        self,
        collection_name: str,
        index_name: str,
        property_keys: List[str],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.drop_index_properties(
            collection_name, index_name, property_keys=property_keys, timeout=timeout, **kwargs
        )

    def alter_collection_properties(
        self, collection_name: str, properties: dict, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.alter_collection_properties(
            collection_name,
            properties=properties,
            timeout=timeout,
            **kwargs,
        )

    def drop_collection_properties(
        self,
        collection_name: str,
        property_keys: List[str],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.drop_collection_properties(
            collection_name, property_keys=property_keys, timeout=timeout, **kwargs
        )

    def alter_collection_field(
        self,
        collection_name: str,
        field_name: str,
        field_params: dict,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.alter_collection_field_properties(
            collection_name,
            field_name=field_name,
            field_params=field_params,
            timeout=timeout,
            **kwargs,
        )

    def add_collection_field(
        self,
        collection_name: str,
        field_name: str,
        data_type: DataType,
        desc: str = "",
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Add a new field to the collection.

        Args:
            collection_name(``string``): The name of collection.
            name (str): The name of the field.
            dtype (DataType): The data type of the field.
            desc (str): The description of the field.
            timeout (``float``, optional): A duration of time in seconds to allow for the RPC.
                If timeout is set to None, the client keeps waiting until the server
                responds or an error occurs.
            **kwargs (``dict``): Optional field params
                nullable: bool, indicates field is nullable or not, shall be ``True`` for now
                default_value: default val for added field

        Raises:
            MilvusException: If anything goes wrong
        """
        field_schema = self.create_field_schema(field_name, data_type, desc, **kwargs)
        conn = self._get_connection()
        conn.add_collection_field(
            collection_name,
            field_schema,
            timeout=timeout,
            **kwargs,
        )

    def create_partition(
        self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.create_partition(collection_name, partition_name, timeout=timeout, **kwargs)

    def drop_partition(
        self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.drop_partition(collection_name, partition_name, timeout=timeout, **kwargs)

    def has_partition(
        self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
    ) -> bool:
        conn = self._get_connection()
        return conn.has_partition(collection_name, partition_name, timeout=timeout, **kwargs)

    def list_partitions(
        self, collection_name: str, timeout: Optional[float] = None, **kwargs
    ) -> List[str]:
        conn = self._get_connection()
        return conn.list_partitions(collection_name, timeout=timeout, **kwargs)

    def load_partitions(
        self,
        collection_name: str,
        partition_names: Union[str, List[str]],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        if isinstance(partition_names, str):
            partition_names = [partition_names]

        conn = self._get_connection()
        conn.load_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

    def release_partitions(
        self,
        collection_name: str,
        partition_names: Union[str, List[str]],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        if isinstance(partition_names, str):
            partition_names = [partition_names]
        conn = self._get_connection()
        conn.release_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

    def get_partition_stats(
        self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
    ) -> Dict:
        conn = self._get_connection()
        if not isinstance(partition_name, str):
            msg = f"wrong type of argument 'partition_name', str expected, got '{type(partition_name).__name__}'"
            raise TypeError(msg)
        ret = conn.get_partition_stats(collection_name, partition_name, timeout=timeout, **kwargs)
        result = {stat.key: stat.value for stat in ret}
        if "row_count" in result:
            result["row_count"] = int(result["row_count"])
        return result

    def create_user(self, user_name: str, password: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        return conn.create_user(user_name, password, timeout=timeout, **kwargs)

    def drop_user(self, user_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        return conn.delete_user(user_name, timeout=timeout, **kwargs)

    def update_password(
        self,
        user_name: str,
        old_password: str,
        new_password: str,
        reset_connection: Optional[bool] = False,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.update_password(user_name, old_password, new_password, timeout=timeout, **kwargs)
        if reset_connection:
            conn._setup_authorization_interceptor(user_name, new_password, None)
            conn._setup_grpc_channel()

    def list_users(self, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        return conn.list_usernames(timeout=timeout, **kwargs)

    def describe_user(self, user_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        try:
            res = conn.select_one_user(user_name, True, timeout=timeout, **kwargs)
        except Exception as ex:
            raise ex from ex
        if res.groups:
            item = res.groups[0]
            return {"user_name": user_name, "roles": item.roles}
        return {}

    def grant_role(self, user_name: str, role_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        conn.add_user_to_role(user_name, role_name, timeout=timeout, **kwargs)

    def revoke_role(
        self, user_name: str, role_name: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.remove_user_from_role(user_name, role_name, timeout=timeout, **kwargs)

    def create_role(self, role_name: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        conn.create_role(role_name, timeout=timeout, **kwargs)

    def drop_role(
        self, role_name: str, force_drop: bool = False, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.drop_role(role_name, force_drop=force_drop, timeout=timeout, **kwargs)

    def describe_role(self, role_name: str, timeout: Optional[float] = None, **kwargs) -> Dict:
        conn = self._get_connection()
        db_name = kwargs.pop("db_name", "")
        try:
            res = conn.select_grant_for_one_role(role_name, db_name, timeout=timeout, **kwargs)
        except Exception as ex:
            raise ex from ex
        ret = {}
        ret["role"] = role_name
        ret["privileges"] = [dict(i) for i in res.groups]
        return ret

    def list_roles(self, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        try:
            res = conn.select_all_role(False, timeout=timeout, **kwargs)
        except Exception as ex:
            raise ex from ex

        groups = res.groups
        return [g.role_name for g in groups]

    def grant_privilege(
        self,
        role_name: str,
        object_type: str,
        privilege: str,
        object_name: str,
        db_name: Optional[str] = "",
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.grant_privilege(
            role_name, object_type, object_name, privilege, db_name, timeout=timeout, **kwargs
        )

    def revoke_privilege(
        self,
        role_name: str,
        object_type: str,
        privilege: str,
        object_name: str,
        db_name: Optional[str] = "",
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.revoke_privilege(
            role_name, object_type, object_name, privilege, db_name, timeout=timeout, **kwargs
        )

    def grant_privilege_v2(
        self,
        role_name: str,
        privilege: str,
        collection_name: str,
        db_name: Optional[str] = None,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Grant a privilege or a privilege group to a role.

        Args:
            role_name (``str``): The name of the role.
            privilege (``str``): The privilege or privilege group to grant.
            collection_name (``str``): The name of the collection.
            db_name (``str``, optional): The name of the database. It will use default database
                if not specified.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.grant_privilege_v2(
            role_name,
            privilege,
            collection_name,
            db_name=db_name,
            timeout=timeout,
            **kwargs,
        )

    def revoke_privilege_v2(
        self,
        role_name: str,
        privilege: str,
        collection_name: str,
        db_name: Optional[str] = None,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Revoke a privilege or a privilege group from a role.

        Args:
            role_name (``str``): The name of the role.
            privilege (``str``): The privilege or privilege group to revoke.
            collection_name (``str``): The name of the collection.
            db_name (``str``, optional): The name of the database. It will use default database
                if not specified.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.revoke_privilege_v2(
            role_name,
            privilege,
            collection_name,
            db_name=db_name,
            timeout=timeout,
            **kwargs,
        )

    def create_alias(
        self, collection_name: str, alias: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.create_alias(collection_name, alias, timeout=timeout, **kwargs)

    def drop_alias(self, alias: str, timeout: Optional[float] = None, **kwargs):
        conn = self._get_connection()
        conn.drop_alias(alias, timeout=timeout, **kwargs)

    def alter_alias(
        self, collection_name: str, alias: str, timeout: Optional[float] = None, **kwargs
    ):
        conn = self._get_connection()
        conn.alter_alias(collection_name, alias, timeout=timeout, **kwargs)

    def describe_alias(self, alias: str, timeout: Optional[float] = None, **kwargs) -> Dict:
        conn = self._get_connection()
        return conn.describe_alias(alias, timeout=timeout, **kwargs)

    def list_aliases(
        self, collection_name: str = "", timeout: Optional[float] = None, **kwargs
    ) -> List[str]:
        conn = self._get_connection()
        return conn.list_aliases(collection_name, timeout=timeout, **kwargs)

    # deprecated same to use_database
    def using_database(self, db_name: str, **kwargs):
        self.use_database(db_name, **kwargs)

    def use_database(self, db_name: str, **kwargs):
        conn = self._get_connection()
        conn.reset_db_name(db_name)

    def create_database(
        self,
        db_name: str,
        properties: Optional[dict] = None,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        conn = self._get_connection()
        conn.create_database(db_name=db_name, properties=properties, timeout=timeout, **kwargs)

    def drop_database(self, db_name: str, **kwargs):
        conn = self._get_connection()
        conn.drop_database(db_name, **kwargs)

    def list_databases(self, timeout: Optional[float] = None, **kwargs) -> List[str]:
        conn = self._get_connection()
        return conn.list_database(timeout=timeout, **kwargs)

    def describe_database(self, db_name: str, **kwargs) -> dict:
        conn = self._get_connection()
        return conn.describe_database(db_name, **kwargs)

    def alter_database_properties(self, db_name: str, properties: dict, **kwargs):
        conn = self._get_connection()
        conn.alter_database(db_name, properties, **kwargs)

    def drop_database_properties(self, db_name: str, property_keys: List[str], **kwargs):
        conn = self._get_connection()
        conn.drop_database_properties(db_name, property_keys, **kwargs)

    def flush(
        self,
        collection_name: str,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Seal all segments in the collection. Inserts after flushing will be written into
            new segments.

        Args:
            collection_name(``string``): The name of collection.
            timeout (float): an optional duration of time in seconds to allow for the RPCs.
                If timeout is not set, the client keeps waiting until the server
                responds or an error occurs.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.flush([collection_name], timeout=timeout, **kwargs)

    def compact(
        self,
        collection_name: str,
        is_clustering: Optional[bool] = False,
        is_l0: Optional[bool] = False,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> int:
        """Compact merge the small segments in a collection

        Args:
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

            is_clustering (``bool``, optional): Option to trigger clustering compaction.

        Raises:
            MilvusException: If anything goes wrong.

        Returns:
            int: An integer represents the server's compaction job. You can use this job ID
            for subsequent state inquiries.
        """
        conn = self._get_connection()
        return conn.compact(
            collection_name, is_clustering=is_clustering, is_l0=is_l0, timeout=timeout, **kwargs
        )

    def get_compaction_state(
        self,
        job_id: int,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> str:
        """Get the state of compaction job

        Args:
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.

        Returns:
            str: the state of this compaction job. Possible values are "UndefiedState", "Executing"
            and "Completed".
        """
        conn = self._get_connection()
        result = conn.get_compaction_state(job_id, timeout=timeout, **kwargs)
        return result.state_name

    def get_server_version(
        self,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> str:
        """Get the running server's version

        Args:
            timeout (``float``, optional): A duration of time in seconds to allow for the RPC.
                If timeout is set to None, the client keeps waiting until the server
                responds or an error occurs.

        Returns:
            str: A string represent the server's version.

        Raises:
            MilvusException: If anything goes wrong
        """
        conn = self._get_connection()
        return conn.get_server_version(timeout=timeout, **kwargs)

    def create_privilege_group(
        self,
        group_name: str,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Create a new privilege group.

        Args:
            group_name (``str``): The name of the privilege group.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.create_privilege_group(group_name, timeout=timeout, **kwargs)

    def drop_privilege_group(
        self,
        group_name: str,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Drop a privilege group.

        Args:
            group_name (``str``): The name of the privilege group.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.drop_privilege_group(group_name, timeout=timeout, **kwargs)

    def list_privilege_groups(
        self,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> List[Dict[str, str]]:
        """List all privilege groups.

        Args:
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Returns:
            List[Dict[str, str]]: A list of privilege groups.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        res = conn.list_privilege_groups(timeout=timeout, **kwargs)
        ret = []
        for g in res.groups:
            ret.append({"privilege_group": g.privilege_group, "privileges": g.privileges})
        return ret

    def add_privileges_to_group(
        self,
        group_name: str,
        privileges: List[str],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Add privileges to a privilege group.

        Args:
            group_name (``str``): The name of the privilege group.
            privileges (``List[str]``): A list of privileges to be added to the group.
                Privileges should be the same type in a group otherwise it will raise an exception.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.add_privileges_to_group(group_name, privileges, timeout=timeout, **kwargs)

    def remove_privileges_from_group(
        self,
        group_name: str,
        privileges: List[str],
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """Remove privileges from a privilege group.

        Args:
            group_name (``str``): The name of the privilege group.
            privileges (``List[str]``): A list of privileges to be removed from the group.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        conn.remove_privileges_from_group(group_name, privileges, timeout=timeout, **kwargs)

    def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
        """Create a resource group
            It will success whether or not the resource group exists.

        Args:
            name: The name of the resource group.
        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.create_resource_group(name, timeout, **kwargs)

    def update_resource_groups(
        self,
        configs: Dict[str, ResourceGroupConfig],
        timeout: Optional[float] = None,
    ):
        """Update resource groups.
            This function updates the resource groups based on the provided configurations.

        Args:
            configs: A mapping of resource group names to their configurations.
            timeout: The timeout value in seconds. Defaults to None.
        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.update_resource_groups(configs, timeout)

    def drop_resource_group(
        self,
        name: str,
        timeout: Optional[float] = None,
    ):
        """Drop a resource group
            It will success if the resource group is existed and empty, otherwise fail.

        Args:
            name: The name of the resource group.
            timeout: The timeout value in seconds. Defaults to None.
        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.drop_resource_group(name, timeout)

    def describe_resource_group(self, name: str, timeout: Optional[float] = None):
        """Drop a resource group
            It will success if the resource group is existed and empty, otherwise fail.

        Args:
            name: The name of the resource group.
            timeout: The timeout value in seconds. Defaults to None.
        Returns:
            ResourceGroupInfo: The detail info of the resource group.
        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.describe_resource_group(name, timeout)

    def list_resource_groups(self, timeout: Optional[float] = None):
        """list all resource group names

        Args:
            timeout: The timeout value in seconds. Defaults to None.
        Returns:
            list[str]: all resource group names
        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.list_resource_groups(timeout)

    def transfer_replica(
        self,
        source_group: str,
        target_group: str,
        collection_name: str,
        num_replicas: int,
        timeout: Optional[float] = None,
    ):
        """transfer num_replica from source resource group to target resource group

        Args:
            source_group: source resource group name
            target_group: target resource group name
            collection_name: collection name which replica belong to
            num_replicas: transfer replica num
            timeout: The timeout value in seconds. Defaults to None.

        Raises:
            MilvusException: If anything goes wrong.
        """
        conn = self._get_connection()
        return conn.transfer_replica(
            source_group, target_group, collection_name, num_replicas, timeout
        )

    def describe_replica(
        self, collection_name: str, timeout: Optional[float] = None, **kwargs
    ) -> List[ReplicaInfo]:
        """Get the current loaded replica information

        Args:
            collection_name (``str``): The name of the given collection.
            timeout (``float``, optional): An optional duration of time in seconds to allow
                for the RPC. When timeout is set to None, client waits until server response
                or error occur.
        Returns:
            List[ReplicaInfo]: All the replica information.
        """
        conn = self._get_connection()
        return conn.describe_replica(collection_name, timeout=timeout, **kwargs)

    def run_analyzer(
        self,
        texts: Union[str, List[str]],
        analyzer_params: Optional[Union[str, Dict]] = None,
        with_hash: bool = False,
        with_detail: bool = False,
        collection_name: Optional[str] = None,
        field_name: Optional[str] = None,
        analyzer_names: Optional[Union[str, List[str]]] = None,
        timeout: Optional[float] = None,
    ):
        """Run analyzer. Return result tokens of analysis.
        Args:
            text(``str``,``List[str]``): The input text (string or string list).
            analyzer_params(``str``,``Dict``,``None``): The parameters of analyzer.
            timeout(``float``, optional): The timeout value in seconds. Defaults to None.
        Returns:
                (``List[str]``,``List[List[str]]``): The result tokens of analysis.
        """

        return self._get_connection().run_analyzer(
            texts,
            analyzer_params=analyzer_params,
            with_hash=with_hash,
            with_detail=with_detail,
            collection_name=collection_name,
            field_name=field_name,
            analyzer_names=analyzer_names,
            timeout=timeout,
        )

    def update_replicate_configuration(
        self,
        clusters: Optional[List[Dict]] = None,
        cross_cluster_topology: Optional[List[Dict]] = None,
        timeout: Optional[float] = None,
        **kwargs,
    ):
        """
        Update replication configuration across Milvus clusters.

        Args:
            clusters (List[Dict], optional): List of cluster configurations.
            Each dict should contain:
                - cluster_id (str): Unique identifier for the cluster
                - connection_param (Dict): Connection parameters with 'uri' and 'token'
                - pchannels (List[str], optional): Physical channels for the cluster

            cross_cluster_topology (List[Dict], optional): List of replication relationships.
            Each dict should contain:
                - source_cluster_id (str): ID of the source cluster
                - target_cluster_id (str): ID of the target cluster

            cross_cluster_topology (List[Dict], optional): List of replication relationships.
            Each dict should contain:
                - source_cluster_id (str): ID of the source cluster
                - target_cluster_id (str): ID of the target cluster

            timeout (float, optional): An optional duration of time in seconds to allow for the RPC
            **kwargs: Additional arguments

        Returns:
            Status: The status of the operation

        Raises:
            ParamError: If neither clusters nor cross_cluster_topology is provided
            MilvusException: If the operation fails

        Examples:
            client.update_replicate_configuration(
                clusters=[
                    {
                        "cluster_id": "source_cluster",
                        "connection_param": {
                            "uri": "http://source:19530",
                            "token": "source_token"
                        },
                        "pchannels": ["source_pchannel1", "source_pchannel2"]
                    },
                    {
                        "cluster_id": "target_cluster",
                        "connection_param": {
                            "uri": "http://target:19530",
                            "token": "target_token"
                        },
                        "pchannels": ["target_pchannel1", "target_pchannel2"]
                    }
                ],
                cross_cluster_topology=[
                    {
                        "source_cluster_id": "source_cluster",
                        "target_cluster_id": "target_cluster"
                    }
                ]
            )
        """

        return self._get_connection().update_replicate_configuration(
            clusters=clusters,
            cross_cluster_topology=cross_cluster_topology,
            timeout=timeout,
            **kwargs,
        )

    def flush_all(self, timeout: Optional[float] = None, **kwargs) -> None:
        """Flush all collections.

        Args:
            timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
            **kwargs: Additional arguments.
        """
        self._get_connection().flush_all(timeout=timeout, **kwargs)

    def get_flush_all_state(self, timeout: Optional[float] = None, **kwargs) -> bool:
        """Get the flush all state.

        Args:
            timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
            **kwargs: Additional arguments.

        Returns:
            bool: True if flush all operation is completed, False otherwise.
        """
        return self._get_connection().get_flush_all_state(timeout=timeout, **kwargs)

    def list_loaded_segments(
        self,
        collection_name: str,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> List[LoadedSegmentInfo]:
        """List loaded segments for a collection.

        Args:
            collection_name (str): The name of the collection.
            timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
            **kwargs: Additional arguments.

        Returns:
            List[LoadedSegmentInfo]: A list of loaded segment information.
        """
        infos = self._get_connection().get_query_segment_info(
            collection_name, timeout=timeout, **kwargs
        )
        return [
            LoadedSegmentInfo(
                info.segmentID,
                info.collectionID,
                collection_name,
                info.num_rows,
                info.is_sorted,
                info.state,
                info.level,
                info.storage_version,
                info.mem_size,
            )
            for info in infos
        ]

    def list_persistent_segments(
        self,
        collection_name: str,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> List[SegmentInfo]:
        """List persistent segments for a collection.

        Args:
            collection_name (str): The name of the collection.
            timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
            **kwargs: Additional arguments.

        Returns:
            List[SegmentInfo]: A list of persistent segment information.
        """
        infos = self._get_connection().get_persistent_segment_infos(
            collection_name, timeout=timeout, **kwargs
        )
        return [
            SegmentInfo(
                info.segmentID,
                info.collectionID,
                collection_name,
                info.num_rows,
                info.is_sorted,
                info.state,
                info.level,
                info.storage_version,
            )
            for info in infos
        ]

    def get_compaction_plans(
        self,
        job_id: int,
        timeout: Optional[float] = None,
        **kwargs,
    ) -> CompactionPlans:
        """Get compaction plans for a specific job.

        Args:
            job_id (int): The ID of the compaction job.
            timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
            **kwargs: Additional arguments.

        Returns:
            CompactionPlans: The compaction plans for the specified job.
        """
        return self._get_connection().get_compaction_plans(job_id, timeout=timeout, **kwargs)
